#ifndef __CHUNK_PROTO_H__
#define __CHUNK_PROTO_H__

#include <dirent.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdint.h>
#include <sys/statvfs.h>
#include <sys/types.h>

#include "list.h"
#include "ylock.h"
#include "chunk.h"
#include "cache.h"
#include "cluster.h"
#include "conn.h"
#include "net_global.h"
#include "ynet_rpc.h"
#include "../replica/replica.h"
#include "../lease/lease_ctl.h"
#include "ec.h"

//extern net_global_t ng;

typedef struct {
        sy_rwlock_t lock;
        io_t io;
        const buffer_t *buf;
        uint32_t magic;
        time_t ltime;
        int retval;
} chunk_local_write_ctx_t;

typedef struct {
        uint64_t clock;
        int count;
        nid_t array[0];
} vfm_t;

#define VFM_SIZE(__count__) (sizeof(vfm_t) + sizeof(nid_t) * __count__)
#define VFM_COUNT_MAX 32

static inline int vfm_exist(const vfm_t *vfm, const nid_t *nid) {
        int i;

        if (unlikely(!vfm)) {
                return 0;
        }

        for (i = 0; i < vfm->count; i++) {
                if (nid->id == vfm->array[i].id)
                        return 1;
        }

        return 0;
}

static inline int vfm_add(vfm_t *vfm, const nid_t *nid)
{
        YASSERT(vfm);

        if (vfm_exist(vfm, nid)) {
                return EEXIST;
        }

        if (vfm->count + 1 > VFM_COUNT_MAX) {
                return EIO;
        }
        
        vfm->array[vfm->count] = *nid;
        vfm->count++;
        vfm->clock++;

        return 0;
}

static inline void vfm_dump(const vfm_t *vfm, char *buf)
{
        int len, i;

        if (vfm) {
                snprintf(buf, MAX_BUF_LEN,  "%ju:", vfm->clock);
                for (i = 0; i < vfm->count; i++) {  
                        len = strlen(buf);
                        snprintf(buf + len, MAX_BUF_LEN - len,  "%s,", network_rname(&vfm->array[i]));
                }
        } else {
                snprintf(buf, MAX_BUF_LEN,  "null");
        }
}

inline static int __vfm_rack_get(const nid_t *nid, char *rack)
{
        int ret;
        char _rack[MAX_NAME_LEN], _node[MAX_NAME_LEN], _disk[MAX_NAME_LEN],
                _site[MAX_NAME_LEN], name[MAX_NAME_LEN];

        network_connect(nid, NULL, 1, 0);

        ret = network_rname1(nid, name);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
                GOTO(err_ret, ret);
        }
                
        hosts_split(name, _site, _rack, _node, _disk);
        snprintf(rack, MAX_BUF_LEN, "%s.%s", _site, _rack);

        return 0;
err_ret:
        return ret;
}

static int __vfm_rack_add(char rack_array[][LICH_REPLICA_MAX], int *_count, const nid_t *nid)
{
        int ret, count, i;
        char rack[MAX_NAME_LEN];

        count = *_count;

        ret = __vfm_rack_get(nid, rack);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        for(i = 0; i < count; i++) {
                DBUG("%s --> %s\n", rack_array[i], rack);
                
                if (strcmp(rack_array[i], rack) == 0) {
                        ret = EEXIST;
                        GOTO(err_ret, ret);
                }
        }

        strcpy(rack_array[i], rack);
        *_count = count + 1;
        
        return 0;
err_ret:
        return ret;
}

inline static int __vfm_rack_count(const vfm_t *vfm, const nid_t *nid, const chkinfo_t *chkinfo,
                            int *_rack_count1, int *_rack_count2)
{
        int ret, rack_count1, rack_count2, i;
        char rack_array[MAX_NAME_LEN][LICH_REPLICA_MAX];
        
        rack_count1 = 0;
        for(i = 0; i < chkinfo->repnum; i++) {
                ret = __vfm_rack_add(rack_array, &rack_count1, &chkinfo->diskid[i].id);
                if (ret) {
                        if (ret == EEXIST) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        rack_count2 = 0;
        for(i = 0; i < vfm->count; i++) {
                ret = __vfm_rack_add(rack_array, &rack_count2, &vfm->array[i]);
                if (ret) {
                        if (ret == EEXIST) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        ret = __vfm_rack_add(rack_array, &rack_count2, nid);
        if (ret) {
                if (ret == EEXIST) {
                        //pass
                } else
                        GOTO(err_ret, ret);
        }

        *_rack_count1 = rack_count1;
        *_rack_count2 = rack_count2;
        
        return 0;
err_ret:
        return ret;
}

inline static int vfm_add_check(vfm_t *vfm, const nid_t *nid, const chkinfo_t *chkinfo)
{
        int ret, tmo, rack_count1, rack_count2;

        if (ng.uptime < gloconf.offline_timeout * 2) {
                network_connect(nid, NULL, 0, 1);
                ret = EBUSY;
                GOTO(err_ret, ret);
        }

        if (!netable_connected(nid)) {
                tmo = gettime() - netable_last_update(nid);
                if (tmo < gloconf.offline_timeout * 2 && conn_online(nid, -1)) {
                        network_connect(nid, NULL, 1, 0);
                        DBUG("chunk "CHKID_FORMAT" @ %s still online, tmo %u\n",
                             CHKID_ARG(&chkinfo->id), network_rname(nid), tmo);
                        ret = EBUSY;
                        GOTO(err_ret, ret);
                }
        }

        if (vfm_exist(vfm, nid)) {//nothing todo
                DBUG("chunk "CHKID_FORMAT" @ %s add vfm exist\n",
                     CHKID_ARG(&chkinfo->id), network_rname(nid));
                goto out;
        }

        if (chkinfo->repnum > (vfm->count + 1)) {//fast path
                ret = vfm_add(vfm, nid);
                if (ret) {
                        UNIMPLEMENTED(__DUMP__);
                }

                DBUG("chunk "CHKID_FORMAT" @ %s add vfm version %llu\n",
                     CHKID_ARG(&chkinfo->id), network_rname(nid), vfm->clock);
        } else {//multi fail
                ret = __vfm_rack_count(vfm, nid, chkinfo, &rack_count1, &rack_count2);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (rack_count1 > rack_count2) {
                        ret = vfm_add(vfm, nid);
                        if (ret) {
                                UNIMPLEMENTED(__DUMP__);
                        }

                        DBUG("chunk "CHKID_FORMAT" @ %s add vfm version %llu\n",
                             CHKID_ARG(&chkinfo->id), network_rname(nid), vfm->clock);
                } else {
                        char tmp[MAX_PATH_LEN], tmp1[MAX_PATH_LEN];
                        vfm_dump(vfm, tmp);
                        CHKINFO_STR(chkinfo, tmp1);
                        DBUG("chunk %s @ %s add vfm fallback, vfm %s\n",
                              tmp1, network_rname(nid), tmp);
                        ret = EBUSY;
                        GOTO(err_ret, ret);
                }
        }
        
out:
        return 0;
err_ret:
        return ret;
}

static inline int chkinfo_rack_check(const chkinfo_t *chkinfo)
{
        int ret, i;
        nid_t array[LICH_REPLICA_MAX];

        for(i = 0; i < chkinfo->repnum; i++) {
                array[i] = chkinfo->diskid[i].id;
        }

        ret = rack_check(array, chkinfo->repnum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}


#define CHUNK_PROTO_EC                  0x0
#define CHUNK_PROTO_REP                 0x1
#define CHUNK_PROTO_MAX                 0x2

struct chunk_proto_ops {
        char name[MAX_NAME_LEN];

        int (*create)(const char *pool, const chkid_t *chkids, int chknum,
                       const nid_t *nids, int repnum, const fileid_t *parent,
                       const nid_t *parentnid, int initzero, uint64_t info_version,
                       const buffer_t *buf, const ec_t *ec);
        int (*unlink)(const chkinfo_t *chkinfo, const chkstat_t *chkstat);

        int (*sha1)(const chkinfo_t *chkinfo, sha1_result_t *result);
        int (*check)(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat,
                     vfm_t *vfm, const fileid_t *parent,
                     int force, const lease_token_t *token, const ec_t *ec, int *oflags);
        int (*sync)(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat,
                    vfm_t *vfm, const fileid_t *parent,
                    int force, const lease_token_t *token, const ec_t *ec, int *oflags);
        int (*move)(const char *pool, chkinfo_t *_chkinfo, chkstat_t *_chkstat,
                    vfm_t *vfm, const chkid_t *parent,
                    const nid_t *nid, int count, const lease_token_t *token, const ec_t *ec);

        int (*read)(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                const io_t *io, buffer_t *buf, const ec_t *ec);
        int (*write)(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                const io_t *io, const buffer_t *buf, const ec_t *ec);
};

/* from chunk_proto.c */
int chunk_proto_init();
int chunk_proto_ops_register(struct chunk_proto_ops *hook, int type);
struct chunk_proto_ops *chunk_proto_ops_get(const ec_t *ec);

int chunk_magic();
int chunk_connect(const nid_t *nid, const chkid_t *chkid, const chkid_t *parent,
                  int force, const lease_token_t *token, clockstat_t *clockstat, repstat_t *repstat);
int chunk_getclock(const nid_t *nid, const chkid_t *chkid, clockstat_t *clockstat);
void chunk_proto_clock(const chkinfo_t *chkinfo, chkstat_t *chkstat, uint64_t *clock, int op);

int chunk_proto_set(chkinfo_t *chkinfo, chkstat_t *chkstat, const nid_t *nid, int status, int *seted);
void chunk_proto_reset(const chkinfo_t *chkinfo, chkstat_t *chkstat);

int chunk_proto_setparent(const chkinfo_t *chkinfo, const chkid_t *fileid);

int chunk_replica_consistent(const chkid_t *chkid, const reploc_t *reploc, time_t _ltime, const vfm_t *vfm);
int chunk_replica_intact(const chkid_t *chkid, const reploc_t *reploc, time_t _ltime, int *reset);

int chunk_proto_consistent(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm);
int chunk_proto_intact(const chkinfo_t *chkinfo, const chkstat_t *chkstat);
int chunk_proto_unintact(const chkinfo_t *chkinfo, const chkstat_t *chkstat);
int chunk_proto_connected(const chkinfo_t *chkinfo, chkstat_t *chkstat, const vfm_t *vfm);

int chunk_proto_rep_unlink(const chkinfo_t *chkinfo, const chkstat_t *chkstat);

void chunk_local_write(chunk_local_write_ctx_t *ctx,
                        const io_t *io, const buffer_t *buf, uint32_t magic, uint32_t ltime);
int chunk_local_write_wait(chunk_local_write_ctx_t *ctx);


/* from chunk_proto_rep.c */
int chunk_proto_rep_init();

int chunk_proto_rep_read(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                     const io_t *io, buffer_t *buf, const ec_t *ec);
int chunk_proto_rep_write(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                      const io_t *io, const buffer_t *buf, const ec_t *ec);
int chunk_proto_rep_create(const char *pool, const chkid_t *chkids, int chknum,
                           const nid_t *nids, int repnum, const fileid_t *parent,
                           const nid_t *parentnid, int initzero, uint64_t info_version,
                           const buffer_t *buf, const ec_t *ec);
int chunk_proto_rep_check(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat,
                          vfm_t *vfm, const fileid_t *parent,
                          int force, const lease_token_t *token,
                          const ec_t *ec, int *oflags);
int chunk_proto_rep_sync(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat,
                         vfm_t *vfm, const fileid_t *parent,
                         int force, const lease_token_t *token,
                         const ec_t *ec, int *oflags);
int chunk_proto_rep_move(const char *pool, chkinfo_t *_chkinfo, chkstat_t *_chkstat,
                         vfm_t *vfm, const chkid_t *parent,
                         const nid_t *nid, int count, const lease_token_t *token, const ec_t *ec);

/* from chunk_proto_ec.c */
int chunk_proto_ec_init();
void chunk_proto_ec_strips_init(const io_t *io, const ec_t *ec,
                uint32_t *off, uint32_t *count, buffer_t *strips);
void chunk_proto_ec_strips_free(buffer_t *strips);

int chunk_proto_ec_read(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                     const io_t *io, buffer_t *buf, const ec_t *ec);
int chunk_proto_ec_write(const chkinfo_t *chkinfo, const chkstat_t *chkstat, const vfm_t *vfm,
                      const io_t *io, const buffer_t *buf, const ec_t *ec);
int chunk_proto_ec_write_pull(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                      const io_t *io, const ec_t *ec, uint32_t off, uint32_t count, buffer_t *strips);
int chunk_proto_ec_write_strip(const io_t *io, const buffer_t *buf, const ec_t *ec,
                      uint32_t off, uint32_t count, buffer_t *strips);
int chunk_proto_ec_write_commit(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                      const io_t *io, const ec_t *ec,
                      uint32_t off, uint32_t count, buffer_t *strips);

int chunk_proto_ec_redo(const chkinfo_t *chkinfo, const chkstat_t *chkstat,
                const clockstat_t *clocks, const ec_t *ec, uint32_t off, uint32_t count,
                buffer_t *buf, unsigned char *src_in_err);
int chunk_proto_ec_check_needredo(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat, const fileid_t *parent,
                      int op, int force, int tier, int flags, const ec_t *ec,
                      clockstat_t *clockstat, clockstat_t *clocks, unsigned char *src_in_err,
                      int *redo, int *recovery);
int chunk_proto_ec_check_recovery(const char *pool, chkinfo_t *chkinfo, chkstat_t *chkstat, const fileid_t *parent,
                      int op, int tier, int flags, const ec_t *ec,
                      clockstat_t *clockstat, clockstat_t *clocks, unsigned char *src_in_err);

void chkinfo_dump(const chkinfo_t *chkinfo, vfm_t *_vfm, char *buf, int _clock);

#endif
